package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 事件时间 rowtime()
 * 声明一个额外的字段来作为事件时间字段
 */
public class Flink03_TimeAttributes_02 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int, " +
                "rowtime as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for rowtime as rowtime - interval '5' second)" +
                "with("
                + "'connector' = 'filesystem',"
                + "'path' = 'input/sensor.txt',"
                + "'format' = 'csv'"
                + ")");

        tableEnv.sqlQuery("select * from sensor").execute().print();
    }
}

/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
--------------------------------
+----+--------------------------------+----------------------+-------------+-------------------------+
| op |                             id |                   ts |          vc |                 rowtime |
+----+--------------------------------+----------------------+-------------+-------------------------+
| +I |                       sensor_1 |                    1 |          10 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_1 |                    2 |          20 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    4 |          30 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_1 |                    4 |         400 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    5 |          50 | 1970-01-01 08:00:00.000 |
| +I |                       sensor_2 |                    6 |          60 | 1970-01-01 08:00:00.000 |
+----+--------------------------------+----------------------+-------------+-------------------------+
 */